python 实现多线程并返回函数返回值的三种方法

您所在的位置:网站首页 python subprocesspopen获取返回值 python 实现多线程并返回函数返回值的三种方法

python 实现多线程并返回函数返回值的三种方法

2024-07-14 11:41| 来源: 网络整理| 查看: 265

方法一:使用threading

在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。这里使用的是全局变量。

from threading import Thread from concurrent.futures import ThreadPoolExecutor, as_completed import Queue import time q = Queue.Queue() def thread_function(age): for i in age: i += 1 q.put( { 'age': i } ) def run_threading(target, args, count): """ :param target: 目标函数 :param args: 函数参数 :param count: 线程数量 """ ts = [] for i in range(count): t = Thread(target=target, args=args) ts.append(t) [i.start() for i in ts] [i.join() for i in ts] if __name__ == '__main__': ages = [1, 3, 4] # 1111 run_threading(thread_function, (ages,), 1) output = [] while not q.empty(): output.append(q.get()) print output

输出:

[{'age': 2}, {'age': 4}, {'age': 5}] Process finished with exit code 0 方法二:使用ThreadPoolExecutor的submit

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象。这里主要关注线程池,不仅可以帮我们自动调度线程,还可以做到:

主线程可以获取某一个线程(或者任务的)的状态,以及返回值。当一个线程完成的时候,主线程能够立即知道。让多线程和多进程的编码接口一致。 def thread_function(age): return age+1 def run_thread_pool_sub(target, args, max_work_count=3): with ThreadPoolExecutor(max_workers=max_work_count) as t: res = [t.submit(target, i) for i in args] return res if __name__ == '__main__': ages = [1, 3, 4] res = run_thread_pool_sub(thread_function, ages) for future in as_completed(res): data = future.result() print data

as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程 输出:

4 2 5 Process finished with exit code 0 这里可以看出submit的返回是无序的

这里看下源码:

def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__

注意的是,它就是执行一个单独的函数,并且返回的是future对象(具体请看官方文档)。

方法三:使用ThreadPoolExecutor的map def thread_function(age): for i in age: yield i+1 def run_thread_pool(target, args, max_work_count=6): with ThreadPoolExecutor(max_workers=max_work_count) as t: res = t.map(target, args) return res if __name__ == '__main__': ages = [1, 3, 4] # 2222 res = run_thread_pool(target=thread_function, args=(ages,)) for j in res: for i in j: print i

输出:

2 4 5 Process finished with exit code 0 这里看出map的输出是有序的

这里看下map的源码:

def map(self, fn, *iterables, **kwargs): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ timeout = kwargs.get('timeout') if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: # reverse to keep finishing order fs.reverse() while fs: # Careful not to keep a reference to the popped future if timeout is None: yield fs.pop().result() else: yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator()

它的参数是个iterables,所以当参数为列表字典等时,只需要写一个map函数就行了,而且它的返回值也是个iterable。 以上~

参考:[python] ThreadPoolExecutor线程池



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3